package com.pk.flink.functions.source;

import com.pk.flink.bean.Access;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class AccecssFlatMapFunction implements FlatMapFunction<String,Access> {
    @Override
    public void flatMap(String str, Collector<Access> collector) throws Exception {
        String[] splits = str.split(",");
        Access access = new Access(Long.parseLong(splits[0]), splits[1], Double.parseDouble(splits[2]));
        if(access.getTraffic() < 200) {

        }
        else if(access.getTraffic() == 200) {
            collector.collect(access);
        }
        else {
            collector.collect(access);
            collector.collect(access);
        }
    }
}
